package com.ji.storm.TestStorm.topology;

import com.ji.storm.TestStorm.Spout;
import com.ji.storm.TestStorm.Utils;
import com.ji.storm.TestStorm.bolt.ReportBolt;
import com.ji.storm.TestStorm.bolt.SplitBolt;
import com.ji.storm.TestStorm.bolt.WordCountBolt;
import com.ji.storm.TestStorm.cons.Const;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordTopology {
	private static final String SPOUT = "spout-spout";
	private static final String SPLIT = "split-bolt";
	private static final String COUNT = "count-bolt";
	private static final String REPORT = "report-bolt";
	private static final String TOP = "top-topology";

	public static void main(String[] args)
			throws InterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException {
		Spout spout = new Spout();
		SplitBolt split = new SplitBolt();
		WordCountBolt count = new WordCountBolt();
		ReportBolt report = new ReportBolt();

		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout(SPOUT, spout);
		builder.setBolt(SPLIT, split, 2).setNumTasks(4).shuffleGrouping(SPOUT);
		builder.setBolt(COUNT, count).fieldsGrouping(SPLIT, new Fields(Const.WORD));
		// builder.setBolt(COUNT, count).globalGrouping(SPLIT);
		builder.setBolt(REPORT, report).globalGrouping(COUNT);
		Config conf = new Config();
		// 设置2个worker
		conf.setNumWorkers(2);
		if (args.length == 0) {
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology(TOP, conf, builder.createTopology());
			Utils.waitForSeconds(10);
			cluster.killTopology(TOP);
			cluster.shutdown();
		} else {
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		}
	}
}
